/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;

import static com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;

import static com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;

import static com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;

import static com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



import com.google.api.client.json.JsonFactory;

import com.google.api.services.bigquery.model.Job;

import com.google.api.services.bigquery.model.JobConfigurationQuery;

import com.google.api.services.bigquery.model.JobReference;

import com.google.api.services.bigquery.model.JobStatistics;

import com.google.api.services.bigquery.model.Table;

import com.google.api.services.bigquery.model.TableReference;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.api.services.bigquery.model.TimePartitioning;

import com.google.auto.value.AutoValue;

import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;

import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;

import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;

import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;

import java.io.IOException;

import java.util.List;

import java.util.Map;

import java.util.regex.Pattern;

import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.apache.avro.generic.GenericRecord;

import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.PipelineRunner;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.CannotProvideCoderException;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.CoderRegistry;

import com.bff.gaia.unified.sdk.coders.KvCoder;

import com.bff.gaia.unified.sdk.coders.StringUtf8Coder;

import com.bff.gaia.unified.sdk.extensions.gcp.options.GcpOptions;

import com.bff.gaia.unified.sdk.extensions.gcp.util.Transport;

import com.bff.gaia.unified.sdk.extensions.gcp.util.gcsfs.GcsPath;

import com.bff.gaia.unified.sdk.extensions.protobuf.ProtoCoder;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.FileSystems;

import com.bff.gaia.unified.sdk.io.fs.MoveOptions;

import com.bff.gaia.unified.sdk.io.fs.ResolveOptions;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryServices.JobService;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.BigQuerySourceBase.ExtractResult;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;

import com.bff.gaia.unified.sdk.io.gcp.bigquery.PassThroughThenCleanup.ContextContainer;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.options.ValueProvider.NestedValueProvider;

import com.bff.gaia.unified.sdk.options.ValueProvider.StaticValueProvider;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.MapElements;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.Reshuffle;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.SerializableFunctions;

import com.bff.gaia.unified.sdk.transforms.SimpleFunction;

import com.bff.gaia.unified.sdk.transforms.View;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollection.IsBounded;

import com.bff.gaia.unified.sdk.values.PCollectionTuple;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.sdk.values.TupleTag;

import com.bff.gaia.unified.sdk.values.TupleTagList;

import com.bff.gaia.unified.sdk.values.TypeDescriptors;

import com.bff.gaia.unified.sdk.values.ValueInSingleWindow;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Predicates;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Strings;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Iterables;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import org.joda.time.Duration;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;





/**

 * {@link PTransform}s for reading and writing <a

 * href="https://developers.google.com/bigquery/">BigQuery</a> tables.

 *

 * <h3>Table References</h3>

 *

 * <p>A fully-qualified BigQuery table name consists of three components:

 *

 * <ul>

 *   <li>{@code projectId}: the Cloud project id (defaults to {@link GcpOptions#getProject()}).

 *   <li>{@code datasetId}: the BigQuery dataset id, unique within a project.

 *   <li>{@code tableId}: a table id, unique within a dataset.

 * </ul>

 *

 * <p>BigQuery table references are stored as a {@link TableReference}, which comes from the <a

 * href="https://cloud.google.com/bigquery/client-libraries">BigQuery Java Client API</a>. Tables

 * can be referred to as Strings, with or without the {@code projectId}. A helper function is

 * provided ({@link BigQueryHelpers#parseTableSpec(String)}) that parses the following string forms

 * into a {@link TableReference}:

 *

 * <ul>

 *   <li>[{@code project_id}]:[{@code dataset_id}].[{@code table_id}]

 *   <li>[{@code dataset_id}].[{@code table_id}]

 * </ul>

 *

 * <h3>Reading</h3>

 *

 * <p>Reading from BigQuery is supported by {@link #read(SerializableFunction)}, which parses

 * records in <a href="https://cloud.google.com/bigquery/data-formats#avro_format">AVRO format</a>

 * into a custom type using a specified parse function, and by {@link #readTableRows} which parses

 * them into {@link TableRow}, which may be more convenient but has lower performance.

 *

 * <p>Both functions support reading either from a table or from the result of a query, via {@link

 * TypedRead#from(String)} and {@link TypedRead#fromQuery} respectively. Exactly one of these must

 * be specified.

 *

 * <p><b>Example: Reading rows of a table as {@link TableRow}.</b>

 *

 * <pre>{@code

 * PCollection<TableRow> weatherData = pipeline.apply(

 *     BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));

 * }</pre>

 *

 * <b>Example: Reading rows of a table and parsing them into a custom type.</b>

 *

 * <pre>{@code

 * PCollection<WeatherRecord> weatherData = pipeline.apply(

 *    BigQueryIO

 *      .read(new SerializableFunction<SchemaAndRecord, WeatherRecord>() {

 *        public WeatherRecord apply(SchemaAndRecord schemaAndRecord) {

 *          return new WeatherRecord(...);

 *        }

 *      })

 *      .from("clouddataflow-readonly:samples.weather_stations"))

 *      .withCoder(SerializableCoder.of(WeatherRecord.class));

 * }</pre>

 *

 * <p>Note: When using {@link #read(SerializableFunction)}, you may sometimes need to use {@link

 * TypedRead#withCoder(Coder)} to specify a {@link Coder} for the result type, if Unified fails to

 * infer it automatically.

 *

 * <p><b>Example: Reading results of a query as {@link TableRow}.</b>

 *

 * <pre>{@code

 * PCollection<TableRow> meanTemperatureData = pipeline.apply(BigQueryIO.readTableRows()

 *     .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));

 * }</pre>

 *

 * <p>Users can optionally specify a query priority using {@link TypedRead#withQueryPriority(

 * TypedRead.QueryPriority)} and a geographic location where the query will be executed using {@link

 * TypedRead#withQueryLocation(String)}. Query location must be specified for jobs that are not

 * executed in US or EU. See <a

 * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:

 * query</a>.

 *

 * <h3>Writing</h3>

 *

 * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes a

 * {@link PCollection} of a user-defined type when using {@link BigQueryIO#write()} (recommended),

 * or a {@link PCollection} of {@link TableRow TableRows} as input when using {@link

 * BigQueryIO#writeTableRows()} (not recommended). When using a user-defined type, a function must

 * be provided to turn this type into a {@link TableRow} using {@link

 * BigQueryIO.Write#withFormatFunction(SerializableFunction)}.

 *

 * <pre>{@code

 * class Quote {

 *   final Instant timestamp;

 *   final String exchange;

 *   final String symbol;

 *   final double price;

 *

 *   Quote(Instant timestamp, String exchange, String symbol, double price) {

 *     // initialize all member variables.

 *   }

 * }

 *

 * PCollection<Quote> quotes = ...

 *

 * quotes.apply(BigQueryIO

 *     .<Quote>write()

 *     .to("my-project:my_dataset.my_table")

 *     .withSchema(new TableSchema().setFields(

 *         ImmutableList.of(

 *           new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),

 *           new TableFieldSchema().setName("exchange").setType("STRING"),

 *           new TableFieldSchema().setName("symbol").setType("STRING"),

 *           new TableFieldSchema().setName("price").setType("FLOAT"))))

 *     .withFormatFunction(quote -> new TableRow().set(..set the columns..))

 *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

 * }</pre>

 *

 * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should append to an

 * existing table, replace the table, or verify that the table is empty. Note that the dataset being

 * written to must already exist. Unbounded PCollections can only be written using {@link

 * Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.

 *

 * <p>BigQueryIO supports automatically inferring the BigQuery table schema from the Unified schema on

 * the input PCollection. Unified can also automatically format the input into a TableRow in this case,

 * if no format function is provide. In the above example, the quotes PCollection has a schema that

 * Unified infers from the Quote POJO. So the write could be done more simply as follows:

 *

 * <pre>{@code

 * {@literal @}DefaultSchema(JavaFieldSchema.class)

 * class Quote {

 *   final Instant timestamp;

 *   final String exchange;

 *   final String symbol;

 *   final double price;

 *

 *   {@literal @}SchemaCreate

 *   Quote(Instant timestamp, String exchange, String symbol, double price) {

 *     // initialize all member variables.

 *   }

 * }

 *

 * PCollection<Quote> quotes = ...

 *

 * quotes.apply(BigQueryIO

 *     .<Quote>write()

 *     .to("my-project:my_dataset.my_table")

 *     .useUnifiedSchema()

 *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

 * }</pre>

 *

 * <h3>Loading historical data into time-partitioned BigQuery tables</h3>

 *

 * <p>To load historical data into a time-partitioned BigQuery table, specify {@link

 * BigQueryIO.Write#withTimePartitioning} with a {@link TimePartitioning#setField(String) field}

 * used for <a

 * href="https://cloud.google.com/bigquery/docs/partitioned-tables#partitioned_tables">column-based

 * partitioning</a>. For example:

 *

 * <pre>{@code

 * PCollection<Quote> quotes = ...;

 *

 * quotes.apply(BigQueryIO.write()

 *         .withSchema(schema)

 *         .withFormatFunction(quote -> new TableRow()

 *            .set("timestamp", quote.getTimestamp())

 *            .set(..other columns..))

 *         .to("my-project:my_dataset.my_table")

 *         .withTimePartitioning(new TimePartitioning().setField("time")));

 * }</pre>

 *

 * <h3>Writing different values to different tables</h3>

 *

 * <p>A common use case is to dynamically generate BigQuery table names based on the current value.

 * To support this, {@link BigQueryIO.Write#to(SerializableFunction)} accepts a function mapping the

 * current element to a tablespec. For example, here's code that outputs quotes of different stocks

 * to different tables:

 *

 * <pre>{@code

 * PCollection<Quote> quotes = ...;

 *

 * quotes.apply(BigQueryIO.write()

 *         .withSchema(schema)

 *         .withFormatFunction(quote -> new TableRow()...)

 *         .to((ValueInSingleWindow<Quote> quote) -> {

 *             String symbol = quote.getSymbol();

 *             return new TableDestination(

 *                 "my-project:my_dataset.quotes_" + symbol, // Table spec

 *                 "Quotes of stock " + symbol // Table description

 *               );

 *           });

 * }</pre>

 *

 * <p>Per-table schemas can also be provided using {@link BigQueryIO.Write#withSchemaFromView}. This

 * allows you the schemas to be calculated based on a previous pipeline stage or statically via a

 * {@link Create} transform. This method expects to receive a

 * map-valued {@link PCollectionView}, mapping table specifications (project:dataset.table-id), to

 * JSON formatted {@link TableSchema} objects. All destination tables must be present in this map,

 * or the pipeline will fail to create tables. Care should be taken if the map value is based on a

 * triggered aggregation over and unbounded {@link PCollection}; the side input will contain the

 * entire history of all table schemas ever generated, which might blow up memory usage. This method

 * can also be useful when writing to a single table, as it allows a previous stage to calculate the

 * schema (possibly based on the full collection of records being written to BigQuery).

 *

 * <p>For the most general form of dynamic table destinations and schemas, look at {@link

 * BigQueryIO.Write#to(DynamicDestinations)}.

 *

 * <h3>Insertion Method</h3>

 *

 * {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using

 * {@link BigQueryIO.Write#withMethod}. If no method is supplied, then a default method will be

 * chosen based on the input PCollection. See {@link BigQueryIO.Write.Method} for more information

 * about the methods. The different insertion methods provide different tradeoffs of cost, quota,

 * and data consistency; please see BigQuery documentation for more information about these

 * tradeoffs.

 *

 * <h3>Usage with templates</h3>

 *

 * <p>When using {@link #read} or {@link #readTableRows()} in a template, it's required to specify

 * {@link Read#withTemplateCompatibility()}. Specifying this in a non-template pipeline is not

 * recommended because it has somewhat lower performance.

 *

 * <p>When using {@link #write()} or {@link #writeTableRows()} with batch loads in a template, it is

 * recommended to specify {@link Write#withCustomGcsTempLocation}. Writing to BigQuery via batch

 * loads involves writing temporary files to this location, so the location must be accessible at

 * pipeline execution time. By default, this location is captured at pipeline <i>construction</i>

 * time, may be inaccessible if the template may be reused from a different project or at a moment

 * when the original location no longer exists. {@link

 * Write#withCustomGcsTempLocation(ValueProvider)} allows specifying the location as an argument to

 * the template invocation.

 *

 * <h3>Permissions</h3>

 *

 * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the

 * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more

 * details.

 *

 * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control

 * </a> for security and permission related information specific to BigQuery.

 */

public class BigQueryIO {

  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);



  /** Singleton instance of the JSON factory used to read and write JSON formatted rows. */

  static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();



  /**

   * Project IDs must contain 6-63 lowercase letters, digits, or dashes. IDs must start with a

   * letter and may not end with a dash. This regex isn't exact - this allows for patterns that

   * would be rejected by the service, but this is sufficient for basic parsing of table references.

   */

  private static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]{4,61}[a-z0-9]";



  /** Regular expression that matches Dataset IDs. */

  private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";



  /** Regular expression that matches Table IDs. */

  private static final String TABLE_REGEXP = "[-\\w$@]{1,1024}";



  /**

   * Matches table specifications in the form {@code "[project_id]:[dataset_id].[table_id]"} or

   * {@code "[dataset_id].[table_id]"}.

   */

  private static final String DATASET_TABLE_REGEXP =

      String.format(

          "((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)",

          PROJECT_ID_REGEXP, DATASET_REGEXP, TABLE_REGEXP);



  static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);



  /**

   * A formatting function that maps a TableRow to itself. This allows sending a {@code

   * PCollection<TableRow>} directly to BigQueryIO.Write.

   */

  static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;



  /**

   * @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link

   *     #readTableRows()} does exactly the same as {@link #read}, however {@link

   *     #read(SerializableFunction)} performs better.

   */

  @Deprecated

  public static Read read() {

    return new Read();

  }



  /**

   * Like {@link #read(SerializableFunction)} but represents each row as a {@link TableRow}.

   *

   * <p>This method is more convenient to use in some cases, but usually has significantly lower

   * performance than using {@link #read(SerializableFunction)} directly to parse data into a

   * domain-specific type, due to the overhead of converting the rows to {@link TableRow}.

   */

  public static TypedRead<TableRow> readTableRows() {

    return read(new TableRowParser()).withCoder(TableRowJsonCoder.of());

  }



  /**

   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per

   * each row of the table or query result, parsed from the BigQuery AVRO format using the specified

   * function.

   *

   * <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a {@link

   * GenericRecord} representing the row, indexed by column name. Here is a sample parse function

   * that parses click events from a table.

   *

   * <pre>{@code

   * class ClickEvent { long userId; String url; ... }

   *

   * p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {

   *   public ClickEvent apply(SchemaAndRecord record) {

   *     GenericRecord r = record.getRecord();

   *     return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));

   *   }

   * }).from("...");

   * }</pre>

   */

  public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> parseFn) {

    return new AutoValue_BigQueryIO_TypedRead.Builder<T>()

        .setValidate(true)

        .setWithTemplateCompatibility(false)

        .setBigQueryServices(new BigQueryServicesImpl())

        .setParseFn(parseFn)

        .setMethod(Method.DEFAULT)

        .build();

  }



  @VisibleForTesting

  static class TableRowParser implements SerializableFunction<SchemaAndRecord, TableRow> {



    public static final TableRowParser INSTANCE = new TableRowParser();



    @Override

    public TableRow apply(SchemaAndRecord schemaAndRecord) {

      return BigQueryAvroUtils.convertGenericRecordToTableRow(

          schemaAndRecord.getRecord(), schemaAndRecord.getTableSchema());

    }

  }



  /** Implementation of {@link BigQueryIO#read()}. */

  public static class Read extends PTransform<PBegin, PCollection<TableRow>> {

    private final TypedRead<TableRow> inner;



    Read() {

      this(BigQueryIO.read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of()));

    }



    Read(TypedRead<TableRow> inner) {

      this.inner = inner;

    }



    @Override

    public PCollection<TableRow> expand(PBegin input) {

      return input.apply(inner);

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      this.inner.populateDisplayData(builder);

    }



    boolean getValidate() {

      return this.inner.getValidate();

    }



    ValueProvider<String> getQuery() {

      return this.inner.getQuery();

    }



    Read withTestServices(BigQueryServices testServices) {

      return new Read(this.inner.withTestServices(testServices));

    }



    ///////////////////////////////////////////////////////////////////



    /** Returns the table to read, or {@code null} if reading from a query instead. */

    @Nullable

    public ValueProvider<TableReference> getTableProvider() {

      return this.inner.getTableProvider();

    }



    /** Returns the table to read, or {@code null} if reading from a query instead. */

    @Nullable

    public TableReference getTable() {

      return this.inner.getTable();

    }



    /**

     * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or {@code

     * "[dataset_id].[table_id]"} for tables within the current project.

     */

    public Read from(String tableSpec) {

      return new Read(this.inner.from(tableSpec));

    }



    /** Same as {@code from(String)}, but with a {@link ValueProvider}. */

    public Read from(ValueProvider<String> tableSpec) {

      return new Read(this.inner.from(tableSpec));

    }



    /** Read from table specified by a {@link TableReference}. */

    public Read from(TableReference table) {

      return new Read(this.inner.from(table));

    }



    /**

     * Reads results received after executing the given query.

     *

     * <p>By default, the query results will be flattened -- see "flattenResults" in the <a

     * href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">Jobs documentation</a> for

     * more information. To disable flattening, use {@link Read#withoutResultFlattening}.

     *

     * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery Standard

     * SQL dialect, use {@link Read#usingStandardSql}.

     */

    public Read fromQuery(String query) {

      return new Read(this.inner.fromQuery(query));

    }



    /** Same as {@code fromQuery(String)}, but with a {@link ValueProvider}. */

    public Read fromQuery(ValueProvider<String> query) {

      return new Read(this.inner.fromQuery(query));

    }



    /**

     * Disable validation that the table exists or the query succeeds prior to pipeline submission.

     * Basic validation (such as ensuring that a query or table is specified) still occurs.

     */

    public Read withoutValidation() {

      return new Read(this.inner.withoutValidation());

    }



    /**

     * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">flattening of

     * query results</a>.

     *

     * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading

     * from a table will cause an error during validation.

     */

    public Read withoutResultFlattening() {

      return new Read(this.inner.withoutResultFlattening());

    }



    /**

     * Enables BigQuery's Standard SQL dialect when reading from a query.

     *

     * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading

     * from a table will cause an error during validation.

     */

    public Read usingStandardSql() {

      return new Read(this.inner.usingStandardSql());

    }



    /**

     * Use new template-compatible source implementation.

     *

     * <p>Use new template-compatible source implementation. This implementation is compatible with

     * repeated template invocations. It does not support dynamic work rebalancing.

     */

    @Experimental(Experimental.Kind.SOURCE_SINK)

    public Read withTemplateCompatibility() {

      return new Read(this.inner.withTemplateCompatibility());

    }

  }



  /////////////////////////////////////////////////////////////////////////////



  /** Implementation of {@link BigQueryIO#read(SerializableFunction)}. */

  @AutoValue

  public abstract static class TypedRead<T> extends com.bff.gaia.unified.sdk.transforms.PTransform<com.bff.gaia.unified.sdk.values.PBegin, com.bff.gaia.unified.sdk.values.PCollection<T>> {

    /** Determines the method used to read data from BigQuery. */

    @Experimental(Experimental.Kind.SOURCE_SINK)

    public enum Method {

      /** The default behavior if no method is explicitly set. Currently {@link #EXPORT}. */

      DEFAULT,



      /**

       * Export data to Google Cloud Storage in Avro format and read data files from that location.

       */

      EXPORT,



      /** Read the contents of a table directly using the BigQuery storage API. */

      DIRECT_READ,

    }



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);



      abstract Builder<T> setQuery(ValueProvider<String> query);



      abstract Builder<T> setValidate(boolean validate);



      abstract Builder<T> setFlattenResults(Boolean flattenResults);



      abstract Builder<T> setUseLegacySql(Boolean useLegacySql);



      abstract Builder<T> setWithTemplateCompatibility(Boolean useTemplateCompatibility);



      abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);



      abstract Builder<T> setQueryPriority(QueryPriority priority);



      abstract Builder<T> setQueryLocation(String location);



      @Experimental(Experimental.Kind.SOURCE_SINK)

      abstract Builder<T> setMethod(Method method);



      @Experimental(Experimental.Kind.SOURCE_SINK)

      abstract Builder<T> setReadOptions(TableReadOptions readOptions);



      abstract TypedRead<T> build();



      abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);



      abstract Builder<T> setCoder(Coder<T> coder);



      abstract Builder<T> setKmsKey(String kmsKey);

    }



    @Nullable

    abstract ValueProvider<String> getJsonTableRef();



    @Nullable

    abstract ValueProvider<String> getQuery();



    abstract boolean getValidate();



    @Nullable

    abstract Boolean getFlattenResults();



    @Nullable

    abstract Boolean getUseLegacySql();



    abstract Boolean getWithTemplateCompatibility();



    abstract BigQueryServices getBigQueryServices();



    abstract SerializableFunction<SchemaAndRecord, T> getParseFn();



    @Nullable

    abstract QueryPriority getQueryPriority();



    @Nullable

    abstract String getQueryLocation();



    @Experimental(Experimental.Kind.SOURCE_SINK)

    abstract Method getMethod();



    @Experimental(Experimental.Kind.SOURCE_SINK)

    @Nullable

    abstract TableReadOptions getReadOptions();



    @Nullable

    abstract Coder<T> getCoder();



    @Nullable

    abstract String getKmsKey();



    /**

     * An enumeration type for the priority of a query.

     *

     * @see <a href="https://cloud.google.com/bigquery/docs/running-queries">Running Interactive and

     *     Batch Queries in the BigQuery documentation</a>

     */

    public enum QueryPriority {

      /**

       * Specifies that a query should be run with an INTERACTIVE priority.

       *

       * <p>Interactive mode allows for BigQuery to execute the query as soon as possible. These

       * queries count towards your concurrent rate limit and your daily limit.

       */

      INTERACTIVE,



      /**

       * Specifies that a query should be run with a BATCH priority.

       *

       * <p>Batch mode queries are queued by BigQuery. These are started as soon as idle resources

       * are available, usually within a few minutes. Batch queries don’t count towards your

       * concurrent rate limit.

       */

      BATCH

    }



    @VisibleForTesting

	Coder<T> inferCoder(CoderRegistry coderRegistry) {

      if (getCoder() != null) {

        return getCoder();

      }



      try {

        return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));

      } catch (CannotProvideCoderException e) {

        throw new IllegalArgumentException(

            "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",

            e);

      }

    }



    private BigQuerySourceBase<T> createSource(String jobUuid, Coder<T> coder) {

      BigQuerySourceBase<T> source;

      if (getQuery() == null) {

        source =

            BigQueryTableSource.create(

                jobUuid, getTableProvider(), getBigQueryServices(), coder, getParseFn());

      } else {

        source =

            BigQueryQuerySource.create(

                jobUuid,

                getQuery(),

                getFlattenResults(),

                getUseLegacySql(),

                getBigQueryServices(),

                coder,

                getParseFn(),

                MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),

                getQueryLocation(),

                getKmsKey());

      }

      return source;

    }



    private BigQueryStorageQuerySource<T> createStorageQuerySource(

        String stepUuid, Coder<T> outputCoder) {

      return BigQueryStorageQuerySource.create(

          stepUuid,

          getQuery(),

          getFlattenResults(),

          getUseLegacySql(),

          MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),

          getQueryLocation(),

          getKmsKey(),

          getParseFn(),

          outputCoder,

          getBigQueryServices());

    }



    private static final String QUERY_VALIDATION_FAILURE_ERROR =

        "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"

            + " pipeline, This validation can be disabled using #withoutValidation.";



    @Override

    public void validate(PipelineOptions options) {

      // Even if existence validation is disabled, we need to make sure that the BigQueryIO

      // read is properly specified.

      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);



      if (getMethod() != Method.DIRECT_READ) {

        String tempLocation = bqOptions.getTempLocation();

        checkArgument(

            !Strings.isNullOrEmpty(tempLocation),

            "BigQueryIO.Read needs a GCS temp location to store temp files.");

        if (getBigQueryServices() == null) {

          try {

            GcsPath.fromUri(tempLocation);

          } catch (IllegalArgumentException e) {

            throw new IllegalArgumentException(

                String.format(

                    "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",

                    tempLocation),

                e);

          }

        }

      }



      ValueProvider<TableReference> table = getTableProvider();



      // Note that a table or query check can fail if the table or dataset are created by

      // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.

      // For these cases the withoutValidation method can be used to disable the check.

      if (getValidate()) {

        if (table != null) {

          checkArgument(table.isAccessible(), "Cannot call validate if table is dynamically set.");

        }

        if (table != null && table.get().getProjectId() != null) {

          // Check for source table presence for early failure notification.

          DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);

          BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());

          BigQueryHelpers.verifyTablePresence(datasetService, table.get());

        } else if (getQuery() != null) {

          checkArgument(

              getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");

          JobService jobService = getBigQueryServices().getJobService(bqOptions);

          try {

            jobService.dryRunQuery(

                bqOptions.getProject(),

                new JobConfigurationQuery()

                    .setQuery(getQuery().get())

                    .setFlattenResults(getFlattenResults())

                    .setUseLegacySql(getUseLegacySql()),

                getQueryLocation());

          } catch (Exception e) {

            throw new IllegalArgumentException(

                String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);

          }

        }

      }

    }



    @Override

    public PCollection<T> expand(PBegin input) {

      ValueProvider<TableReference> table = getTableProvider();



      if (table != null) {

        checkArgument(getQuery() == null, "from() and fromQuery() are exclusive");

        checkArgument(

            getQueryPriority() == null,

            "withQueryPriority() can only be specified when using fromQuery()");

        checkArgument(

            getFlattenResults() == null,

            "Invalid BigQueryIO.Read: Specifies a table with a result flattening"

                + " preference, which only applies to queries");

        checkArgument(

            getUseLegacySql() == null,

            "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"

                + " preference, which only applies to queries");

        if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {

          LOG.info(

              "Project of {} not set. The value of {}.getProject() at execution time will be used.",

              TableReference.class.getSimpleName(),

              BigQueryOptions.class.getSimpleName());

        }

      } else {

        checkArgument(getQuery() != null, "Either from() or fromQuery() is required");

        checkArgument(

            getFlattenResults() != null, "flattenResults should not be null if query is set");

        checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set");

      }

      checkArgument(getParseFn() != null, "A parseFn is required");



      Pipeline p = input.getPipeline();

      final Coder<T> coder = inferCoder(p.getCoderRegistry());



      if (getMethod() == Method.DIRECT_READ) {

        return expandForDirectRead(input, coder);

      }



      checkArgument(

          getReadOptions() == null,

          "Invalid BigQueryIO.Read: Specifies table read options, "

              + "which only applies when using Method.DIRECT_READ");



      final PCollectionView<String> jobIdTokenView;

      PCollection<String> jobIdTokenCollection;

      PCollection<T> rows;

      if (!getWithTemplateCompatibility()) {

        // Create a singleton job ID token at construction time.

        final String staticJobUuid = BigQueryHelpers.randomUUIDString();

        jobIdTokenView =

            p.apply("TriggerIdCreation", Create.of(staticJobUuid))

                .apply("ViewId", View.asSingleton());

        // Apply the traditional Source model.

        rows = p.apply(com.bff.gaia.unified.sdk.io.Read.from(createSource(staticJobUuid, coder)));

      } else {

        // Create a singleton job ID token at execution time.

        jobIdTokenCollection =

            p.apply("TriggerIdCreation", Create.of("ignored"))

                .apply(

                    "CreateJobId",

                    MapElements.via(

                        new SimpleFunction<String, String>() {

                          @Override

                          public String apply(String input) {

                            return BigQueryHelpers.randomUUIDString();

                          }

                        }));

        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());



        final TupleTag<String> filesTag = new TupleTag<>();

        final TupleTag<String> tableSchemaTag = new TupleTag<>();

        PCollectionTuple tuple =

            jobIdTokenCollection.apply(

                "RunCreateJob",

                ParDo.of(

                        new DoFn<String, String>() {

                          @ProcessElement

                          public void processElement(ProcessContext c) throws Exception {

                            String jobUuid = c.element();

                            BigQuerySourceBase<T> source = createSource(jobUuid, coder);

                            BigQueryOptions options =

                                c.getPipelineOptions().as(BigQueryOptions.class);

                            ExtractResult res = source.extractFiles(options);

                            LOG.info("Extract job produced {} files", res.extractedFiles.size());

                            source.cleanupTempResource(options);

                            for (ResourceId file : res.extractedFiles) {

                              c.output(file.toString());

                            }

                            c.output(tableSchemaTag, BigQueryHelpers.toJsonString(res.schema));

                          }

                        })

                    .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag)));

        tuple.get(filesTag).setCoder(StringUtf8Coder.of());

        tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());

        final PCollectionView<String> schemaView =

            tuple.get(tableSchemaTag).apply(View.asSingleton());

        rows =

            tuple

                .get(filesTag)

                .apply(Reshuffle.viaRandomKey())

                .apply(

                    "ReadFiles",

                    ParDo.of(

                            new DoFn<String, T>() {

                              @ProcessElement

                              public void processElement(ProcessContext c) throws Exception {

                                TableSchema schema =

                                    BigQueryHelpers.fromJsonString(

                                        c.sideInput(schemaView), TableSchema.class);

                                String jobUuid = c.sideInput(jobIdTokenView);

                                BigQuerySourceBase<T> source = createSource(jobUuid, coder);

                                List<BoundedSource<T>> sources =

                                    source.createSources(

                                        ImmutableList.of(

                                            FileSystems.matchNewResource(

                                                c.element(), false /* is directory */)),

                                        schema,

                                        null);

                                checkArgument(sources.size() == 1, "Expected exactly one source.");

                                BoundedSource<T> avroSource = sources.get(0);

                                BoundedSource.BoundedReader<T> reader =

                                    avroSource.createReader(c.getPipelineOptions());

                                for (boolean more = reader.start(); more; more = reader.advance()) {

                                  c.output(reader.getCurrent());

                                }

                              }

                            })

                        .withSideInputs(schemaView, jobIdTokenView))

                .setCoder(coder);

      }

      PassThroughThenCleanup.CleanupOperation cleanupOperation =

          new PassThroughThenCleanup.CleanupOperation() {

            @Override

            void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {

              PipelineOptions options = c.getPipelineOptions();

              BigQueryOptions bqOptions = options.as(BigQueryOptions.class);

              String jobUuid = c.getJobId();

              final String extractDestinationDir =

                  resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", jobUuid);

              final String executingProject = bqOptions.getProject();

              JobReference jobRef =

                  new JobReference()

                      .setProjectId(executingProject)

                      .setJobId(getExtractJobId(createJobIdToken(bqOptions.getJobName(), jobUuid)));



              Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);



              if (extractJob != null) {

                List<ResourceId> extractFiles =

                    getExtractFilePaths(extractDestinationDir, extractJob);

                if (extractFiles != null && !extractFiles.isEmpty()) {

                  FileSystems.delete(

                      extractFiles, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);

                }

              }

            }

          };

      return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));

    }



    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {

      ValueProvider<TableReference> tableProvider = getTableProvider();

      Pipeline p = input.getPipeline();

      if (tableProvider != null) {

        // No job ID is required. Read directly from BigQuery storage.

        return p.apply(

            com.bff.gaia.unified.sdk.io.Read.from(

                BigQueryStorageTableSource.create(

                    tableProvider,

                    getReadOptions(),

                    getParseFn(),

                    outputCoder,

                    getBigQueryServices())));

      }



      checkArgument(

          getReadOptions() == null,

          "Invalid BigQueryIO.Read: Specifies table read options, "

              + "which only applies when reading from a table");



      //

      // N.B. All of the code below exists because the BigQuery storage API can't (yet) read from

      // all anonymous tables, so we need the job ID to reason about the name of the destination

      // table for the query to read the data and subsequently delete the table and dataset. Once

      // the storage API can handle anonymous tables, the storage source should be modified to use

      // anonymous tables and all of the code related to job ID generation and table and dataset

      // cleanup can be removed. [BEAM-6931]

      //



      PCollectionView<String> jobIdTokenView;

      PCollection<T> rows;



      if (!getWithTemplateCompatibility()) {

        // Create a singleton job ID token at pipeline construction time.

        String staticJobUuid = BigQueryHelpers.randomUUIDString();

        jobIdTokenView =

            p.apply("TriggerIdCreation", Create.of(staticJobUuid))

                .apply("ViewId", View.asSingleton());

        // Apply the traditional Source model.

        rows =

            p.apply(

                com.bff.gaia.unified.sdk.io.Read.from(

                    createStorageQuerySource(staticJobUuid, outputCoder)));

      } else {

        // Create a singleton job ID token at pipeline execution time.

        PCollection<String> jobIdTokenCollection =

            p.apply("TriggerIdCreation", Create.of("ignored"))

                .apply(

                    "CreateJobId",

                    MapElements.via(

                        new SimpleFunction<String, String>() {

                          @Override

                          public String apply(String input) {

                            return BigQueryHelpers.randomUUIDString();

                          }

                        }));



        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());



        TupleTag<Stream> streamsTag = new TupleTag<>();

        TupleTag<ReadSession> readSessionTag = new TupleTag<>();

        TupleTag<String> tableSchemaTag = new TupleTag<>();



        PCollectionTuple tuple =

            jobIdTokenCollection.apply(

                "RunQueryJob",

                ParDo.of(

                        new DoFn<String, Stream>() {

                          @ProcessElement

                          public void processElement(ProcessContext c) throws Exception {

                            BigQueryOptions options =

                                c.getPipelineOptions().as(BigQueryOptions.class);

                            String jobUuid = c.element();

                            // Execute the query and get the destination table holding the results.

                            // The getTargetTable call runs a new instance of the query and returns

                            // the destination table created to hold the results.

                            BigQueryStorageQuerySource<T> querySource =

                                createStorageQuerySource(jobUuid, outputCoder);

                            Table queryResultTable = querySource.getTargetTable(options);



                            // Create a read session without specifying a desired stream count and

                            // let the BigQuery storage server pick the number of streams.

                            CreateReadSessionRequest request =

                                CreateReadSessionRequest.newBuilder()

                                    .setParent("projects/" + options.getProject())

                                    .setTableReference(

                                        BigQueryHelpers.toTableRefProto(

                                            queryResultTable.getTableReference()))

                                    .setRequestedStreams(0)

                                    .build();



                            ReadSession readSession;

                            try (StorageClient storageClient =

                                getBigQueryServices().getStorageClient(options)) {

                              readSession = storageClient.createReadSession(request);

                            }



                            for (Stream stream : readSession.getStreamsList()) {

                              c.output(stream);

                            }



                            c.output(readSessionTag, readSession);

                            c.output(

                                tableSchemaTag,

                                BigQueryHelpers.toJsonString(queryResultTable.getSchema()));

                          }

                        })

                    .withOutputTags(

                        streamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));



        tuple.get(streamsTag).setCoder(ProtoCoder.of(Stream.class));

        tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));

        tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());



        PCollectionView<ReadSession> readSessionView =

            tuple.get(readSessionTag).apply("ReadSessionView", View.asSingleton());

        PCollectionView<String> tableSchemaView =

            tuple.get(tableSchemaTag).apply("TableSchemaView", View.asSingleton());



        rows =

            tuple

                .get(streamsTag)

                .apply(Reshuffle.viaRandomKey())

                .apply(

                    ParDo.of(

                            new DoFn<Stream, T>() {

                              @ProcessElement

                              public void processElement(ProcessContext c) throws Exception {

                                ReadSession readSession = c.sideInput(readSessionView);

                                TableSchema tableSchema =

                                    BigQueryHelpers.fromJsonString(

                                        c.sideInput(tableSchemaView), TableSchema.class);

                                Stream stream = c.element();



                                BigQueryStorageStreamSource<T> streamSource =

                                    BigQueryStorageStreamSource.create(

                                        readSession,

                                        stream,

                                        tableSchema,

                                        getParseFn(),

                                        outputCoder,

                                        getBigQueryServices());



                                // Read all of the data from the stream. In the event that this work

                                // item fails and is rescheduled, the same rows will be returned in

                                // the same order.

                                BoundedSource.BoundedReader<T> reader =

                                    streamSource.createReader(c.getPipelineOptions());

                                for (boolean more = reader.start(); more; more = reader.advance()) {

                                  c.output(reader.getCurrent());

                                }

                              }

                            })

                        .withSideInputs(readSessionView, tableSchemaView))

                .setCoder(outputCoder);

      }



      PassThroughThenCleanup.CleanupOperation cleanupOperation =

          new CleanupOperation() {

            @Override

            void cleanup(ContextContainer c) throws Exception {

              BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class);

              String jobUuid = c.getJobId();



              TableReference tempTable =

                  createTempTableReference(

                      options.getProject(), createJobIdToken(options.getJobName(), jobUuid));



              DatasetService datasetService = getBigQueryServices().getDatasetService(options);

              LOG.info("Deleting temporary table with query results {}", tempTable);

              datasetService.deleteTable(tempTable);

              LOG.info(

                  "Deleting temporary dataset with query results {}", tempTable.getDatasetId());

              datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());

            }

          };



      return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder

          .addIfNotNull(

              DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider()))

                  .withLabel("Table"))

          .addIfNotNull(DisplayData.item("query", getQuery()).withLabel("Query"))

          .addIfNotNull(

              DisplayData.item("flattenResults", getFlattenResults())

                  .withLabel("Flatten Query Results"))

          .addIfNotNull(

              DisplayData.item("useLegacySql", getUseLegacySql())

                  .withLabel("Use Legacy SQL Dialect"))

          .addIfNotDefault(

              DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true);

    }



    /** Ensures that methods of the from() / fromQuery() family are called at most once. */

    private void ensureFromNotCalledYet() {

      checkState(

          getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called");

    }



    /** See {@link Read#getTableProvider()}. */

    @Nullable

    public ValueProvider<TableReference> getTableProvider() {

      return getJsonTableRef() == null

          ? null

          : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());

    }



    /** See {@link Read#getTable()}. */

    @Nullable

    public TableReference getTable() {

      ValueProvider<TableReference> provider = getTableProvider();

      return provider == null ? null : provider.get();

    }



    /**

     * Sets a {@link Coder} for the result of the parse function. This may be required if a coder

     * can not be inferred automatically.

     */

    public TypedRead<T> withCoder(Coder<T> coder) {

      return toBuilder().setCoder(coder).build();

    }



    /** For query sources, use this Cloud KMS key to encrypt any temporary tables created. */

    public TypedRead<T> withKmsKey(String kmsKey) {

      return toBuilder().setKmsKey(kmsKey).build();

    }



    /** See {@link Read#from(String)}. */

    public TypedRead<T> from(String tableSpec) {

      return from(StaticValueProvider.of(tableSpec));

    }



    /** See {@link Read#from(ValueProvider)}. */

    public TypedRead<T> from(ValueProvider<String> tableSpec) {

      ensureFromNotCalledYet();

      return toBuilder()

          .setJsonTableRef(

              NestedValueProvider.of(

                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),

                  new TableRefToJson()))

          .build();

    }



    /** See {@link Read#fromQuery(String)}. */

    public TypedRead<T> fromQuery(String query) {

      return fromQuery(StaticValueProvider.of(query));

    }



    /** See {@link Read#fromQuery(ValueProvider)}. */

    public TypedRead<T> fromQuery(ValueProvider<String> query) {

      ensureFromNotCalledYet();

      return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();

    }



    /** See {@link Read#from(TableReference)}. */

    public TypedRead<T> from(TableReference table) {

      return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));

    }



    /** See {@link Read#withoutValidation()}. */

    public TypedRead<T> withoutValidation() {

      return toBuilder().setValidate(false).build();

    }



    /** See {@link Read#withoutResultFlattening()}. */

    public TypedRead<T> withoutResultFlattening() {

      return toBuilder().setFlattenResults(false).build();

    }



    /** See {@link Read#usingStandardSql()}. */

    public TypedRead<T> usingStandardSql() {

      return toBuilder().setUseLegacySql(false).build();

    }



    /** See {@link QueryPriority}. */

    public TypedRead<T> withQueryPriority(QueryPriority priority) {

      return toBuilder().setQueryPriority(priority).build();

    }



    /**

     * BigQuery geographic location where the query <a

     * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs">job</a> will be

     * executed. If not specified, Unified tries to determine the location by examining the tables

     * referenced by the query. Location must be specified for queries not executed in US or EU. See

     * <a href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:

     * query</a>.

     */

    public TypedRead<T> withQueryLocation(String location) {

      return toBuilder().setQueryLocation(location).build();

    }



    /** See {@link Method}. */

    @Experimental(Experimental.Kind.SOURCE_SINK)

    public TypedRead<T> withMethod(Method method) {

      return toBuilder().setMethod(method).build();

    }



    /** Read options, including a list of selected columns and push-down SQL filter text. */

    @Experimental(Experimental.Kind.SOURCE_SINK)

    public TypedRead<T> withReadOptions(TableReadOptions readOptions) {

      return toBuilder().setReadOptions(readOptions).build();

    }



    @Experimental(Experimental.Kind.SOURCE_SINK)

    public TypedRead<T> withTemplateCompatibility() {

      return toBuilder().setWithTemplateCompatibility(true).build();

    }



    @VisibleForTesting

    TypedRead<T> withTestServices(BigQueryServices testServices) {

      return toBuilder().setBigQueryServices(testServices).build();

    }

  }



  static String getExtractDestinationUri(String extractDestinationDir) {

    return String.format("%s/%s", extractDestinationDir, "*.avro");

  }



  static List<ResourceId> getExtractFilePaths(String extractDestinationDir, Job extractJob)

      throws IOException {

    JobStatistics jobStats = extractJob.getStatistics();

    List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();

    if (counts.size() != 1) {

      String errorMessage =

          counts.isEmpty()

              ? "No destination uri file count received."

              : String.format(

                  "More than one destination uri file count received. First two are %s, %s",

                  counts.get(0), counts.get(1));

      throw new RuntimeException(errorMessage);

    }

    long filesCount = counts.get(0);



    ImmutableList.Builder<ResourceId> paths = ImmutableList.builder();

    ResourceId extractDestinationDirResourceId =

        FileSystems.matchNewResource(extractDestinationDir, true /* isDirectory */);

    for (long i = 0; i < filesCount; ++i) {

      ResourceId filePath =

          extractDestinationDirResourceId.resolve(

              String.format("%012d%s", i, ".avro"),

              ResolveOptions.StandardResolveOptions.RESOLVE_FILE);

      paths.add(filePath);

    }

    return paths.build();

  }



  /////////////////////////////////////////////////////////////////////////////



  /**

   * A {@link PTransform} that writes a {@link PCollection} to a BigQuery table. A formatting

   * function must be provided to convert each input element into a {@link TableRow} using {@link

   * Write#withFormatFunction(SerializableFunction)}.

   *

   * <p>In BigQuery, each table has an enclosing dataset. The dataset being written must already

   * exist.

   *

   * <p>By default, tables will be created if they do not exist, which corresponds to a {@link

   * Write.CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's

   * Jobs API. A schema must be provided (via {@link Write#withSchema(TableSchema)}), or else the

   * transform may fail at runtime with an {@link IllegalArgumentException}.

   *

   * <p>By default, writes require an empty table, which corresponds to a {@link

   * Write.WriteDisposition#WRITE_EMPTY} disposition that matches the default of BigQuery's Jobs

   * API.

   *

   * <p>Here is a sample transform that produces TableRow values containing "word" and "count"

   * columns:

   *

   * <pre>{@code

   * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {

   *   public void processElement(ProcessContext c) {

   *     TableRow row = new TableRow()

   *         .set("word", c.element().getKey())

   *         .set("count", c.element().getValue().intValue());

   *     c.output(row);

   *   }

   * }

   * }</pre>

   */

  public static <T> Write<T> write() {

    return new AutoValue_BigQueryIO_Write.Builder<T>()

        .setValidate(true)

        .setBigQueryServices(new BigQueryServicesImpl())

        .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)

        .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)

        .setNumFileShards(0)

        .setMethod(Write.Method.DEFAULT)

        .setExtendedErrorInfo(false)

        .setSkipInvalidRows(false)

        .setIgnoreUnknownValues(false)

        .setMaxFilesPerPartition(BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION)

        .setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION)

        .setOptimizeWrites(false)

        .setUseUnifiedSchema(false)

        .build();

  }



  /**

   * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows} to

   * a BigQuery table.

   *

   * <p>It is recommended to instead use {@link #write} with {@link

   * Write#withFormatFunction(SerializableFunction)}.

   */

  public static Write<TableRow> writeTableRows() {

    return BigQueryIO.<TableRow>write().withFormatFunction(IDENTITY_FORMATTER);

  }



  /** Implementation of {@link #write}. */

  @AutoValue

  public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {

    /** Determines the method used to insert data in BigQuery. */

    public enum Method {

      /**

       * The default behavior if no method is explicitly set. If the input is bounded, then file

       * loads will be used. If the input is unbounded, then streaming inserts will be used.

       */

      DEFAULT,



      /**

       * Use BigQuery load jobs to insert data. Records will first be written to files, and these

       * files will be loaded into BigQuery. This is the default method when the input is bounded.

       * This method can be chosen for unbounded inputs as well, as long as a triggering frequency

       * is also set using {@link #withTriggeringFrequency}. BigQuery has daily quotas on the number

       * of load jobs allowed per day, so be careful not to set the triggering frequency too

       * frequent. For more information, see <a

       * href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from

       * Cloud Storage</a>.

       */

      FILE_LOADS,



      /**

       * Use the BigQuery streaming insert API to insert data. This provides the lowest-latency

       * insert path into BigQuery, and therefore is the default method when the input is unbounded.

       * BigQuery will make a strong effort to ensure no duplicates when using this path, however

       * there are some scenarios in which BigQuery is unable to make this guarantee (see

       * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over

       * the output table to periodically clean these rare duplicates. Alternatively, using the

       * {@link #FILE_LOADS} insert method does guarantee no duplicates, though the latency for the

       * insert into BigQuery will be much higher. For more information, see <a

       * href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into

       * BigQuery</a>.

       */

      STREAMING_INSERTS

    }



    @Nullable

    abstract ValueProvider<String> getJsonTableRef();



    @Nullable

    abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination> getTableFunction();



    @Nullable

    abstract SerializableFunction<T, TableRow> getFormatFunction();



    @Nullable

    abstract DynamicDestinations<T, ?> getDynamicDestinations();



    @Nullable

    abstract PCollectionView<Map<String, String>> getSchemaFromView();



    @Nullable

    abstract ValueProvider<String> getJsonSchema();



    @Nullable

    abstract ValueProvider<String> getJsonTimePartitioning();



    abstract CreateDisposition getCreateDisposition();



    abstract WriteDisposition getWriteDisposition();

    /** Table description. Default is empty. */

    @Nullable

    abstract String getTableDescription();

    /** An option to indicate if table validation is desired. Default is true. */

    abstract boolean getValidate();



    abstract BigQueryServices getBigQueryServices();



    @Nullable

    abstract Integer getMaxFilesPerBundle();



    @Nullable

    abstract Long getMaxFileSize();



    abstract int getNumFileShards();



    abstract int getMaxFilesPerPartition();



    abstract long getMaxBytesPerPartition();



    @Nullable

    abstract Duration getTriggeringFrequency();



    abstract Method getMethod();



    @Nullable

    abstract ValueProvider<String> getLoadJobProjectId();



    @Nullable

    abstract InsertRetryPolicy getFailedInsertRetryPolicy();



    @Nullable

    abstract ValueProvider<String> getCustomGcsTempLocation();



    abstract boolean getExtendedErrorInfo();



    abstract Boolean getSkipInvalidRows();



    abstract Boolean getIgnoreUnknownValues();



    @Nullable

    abstract String getKmsKey();



    abstract Boolean getOptimizeWrites();



    abstract Boolean getUseUnifiedSchema();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);



      abstract Builder<T> setTableFunction(

          SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);



      abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);



      abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations);



      abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view);



      abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);



      abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning);



      abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);



      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);



      abstract Builder<T> setTableDescription(String tableDescription);



      abstract Builder<T> setValidate(boolean validate);



      abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);



      abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle);



      abstract Builder<T> setMaxFileSize(Long maxFileSize);



      abstract Builder<T> setNumFileShards(int numFileShards);



      abstract Builder<T> setMaxFilesPerPartition(int maxFilesPerPartition);



      abstract Builder<T> setMaxBytesPerPartition(long maxBytesPerPartition);



      abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency);



      abstract Builder<T> setMethod(Method method);



      abstract Builder<T> setLoadJobProjectId(ValueProvider<String> loadJobProjectId);



      abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy);



      abstract Builder<T> setCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation);



      abstract Builder<T> setExtendedErrorInfo(boolean extendedErrorInfo);



      abstract Builder<T> setSkipInvalidRows(Boolean skipInvalidRows);



      abstract Builder<T> setIgnoreUnknownValues(Boolean ignoreUnknownValues);



      abstract Builder<T> setKmsKey(String kmsKey);



      abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);



      abstract Builder<T> setUseUnifiedSchema(Boolean useUnifiedSchema);



      abstract Write<T> build();

    }



    /**

     * An enumeration type for the BigQuery create disposition strings.

     *

     * @see <a

     *     href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition">

     *     <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a>

     */

    public enum CreateDisposition {

      /**

       * Specifics that tables should not be created.

       *

       * <p>If the output table does not exist, the write fails.

       */

      CREATE_NEVER,



      /**

       * Specifies that tables should be created if needed. This is the default behavior.

       *

       * <p>Requires that a table schema is provided via {@link BigQueryIO.Write#withSchema}. This

       * precondition is checked before starting a job. The schema is not required to match an

       * existing table's schema.

       *

       * <p>When this transformation is executed, if the output table does not exist, the table is

       * created from the provided schema. Note that even if the table exists, it may be recreated

       * if necessary when paired with a {@link WriteDisposition#WRITE_TRUNCATE}.

       */

      CREATE_IF_NEEDED

    }



    /**

     * An enumeration type for the BigQuery write disposition strings.

     *

     * @see <a

     *     href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition">

     *     <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a>

     */

    public enum WriteDisposition {

      /**

       * Specifies that write should replace a table.

       *

       * <p>The replacement may occur in multiple steps - for instance by first removing the

       * existing table, then creating a replacement, then filling it in. This is not an atomic

       * operation, and external programs may see the table in any of these intermediate steps.

       */

      WRITE_TRUNCATE,



      /** Specifies that rows may be appended to an existing table. */

      WRITE_APPEND,



      /**

       * Specifies that the output table must be empty. This is the default behavior.

       *

       * <p>If the output table is not empty, the write fails at runtime.

       *

       * <p>This check may occur long before data is written, and does not guarantee exclusive

       * access to the table. If two programs are run concurrently, each specifying the same output

       * table and a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is

       * possible for both to succeed.

       */

      WRITE_EMPTY

    }



    /**

     * Writes to the given table, specified in the format described in {@link

     * BigQueryHelpers#parseTableSpec}.

     */

    public Write<T> to(String tableSpec) {

      return to(StaticValueProvider.of(tableSpec));

    }



    /** Writes to the given table, specified as a {@link TableReference}. */

    public Write<T> to(TableReference table) {

      return to(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));

    }



    /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */

    public Write<T> to(ValueProvider<String> tableSpec) {

      checkArgument(tableSpec != null, "tableSpec can not be null");

      return toBuilder()

          .setJsonTableRef(

              NestedValueProvider.of(

                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),

                  new TableRefToJson()))

          .build();

    }



    /**

     * Writes to table specified by the specified table function. The table is a function of {@link

     * ValueInSingleWindow}, so can be determined by the value or by the window.

     */

    public Write<T> to(

        SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {

      checkArgument(tableFunction != null, "tableFunction can not be null");

      return toBuilder().setTableFunction(tableFunction).build();

    }



    /** Writes to the table and schema specified by the {@link DynamicDestinations} object. */

    public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) {

      checkArgument(dynamicDestinations != null, "dynamicDestinations can not be null");

      return toBuilder().setDynamicDestinations(dynamicDestinations).build();

    }



    /** Formats the user's type into a {@link TableRow} to be written to BigQuery. */

    public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

      return toBuilder().setFormatFunction(formatFunction).build();

    }



    /**

     * Uses the specified schema for rows to be written.

     *

     * <p>The schema is <i>required</i> only if writing to a table that does not already exist, and

     * {@link CreateDisposition} is set to {@link CreateDisposition#CREATE_IF_NEEDED}.

     */

    public Write<T> withSchema(TableSchema schema) {

      checkArgument(schema != null, "schema can not be null");

      return withJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema)));

    }



    /** Same as {@link #withSchema(TableSchema)} but using a deferred {@link ValueProvider}. */

    public Write<T> withSchema(ValueProvider<TableSchema> schema) {

      checkArgument(schema != null, "schema can not be null");

      return withJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()));

    }



    /**

     * Similar to {@link #withSchema(TableSchema)} but takes in a JSON-serialized {@link

     * TableSchema}.

     */

    public Write<T> withJsonSchema(String jsonSchema) {

      checkArgument(jsonSchema != null, "jsonSchema can not be null");

      return withJsonSchema(StaticValueProvider.of(jsonSchema));

    }



    /** Same as {@link #withJsonSchema(String)} but using a deferred {@link ValueProvider}. */

    public Write<T> withJsonSchema(ValueProvider<String> jsonSchema) {

      checkArgument(jsonSchema != null, "jsonSchema can not be null");

      return toBuilder().setJsonSchema(jsonSchema).build();

    }



    /**

     * Allows the schemas for each table to be computed within the pipeline itself.

     *

     * <p>The input is a map-valued {@link PCollectionView} mapping string tablespecs to

     * JSON-formatted {@link TableSchema}s. Tablespecs must be in the same format as taken by {@link

     * #to(String)}.

     */

    public Write<T> withSchemaFromView(PCollectionView<Map<String, String>> view) {

      checkArgument(view != null, "view can not be null");

      return toBuilder().setSchemaFromView(view).build();

    }



    /**

     * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used

     * when writing to a single table. If {@link #to(SerializableFunction)} or {@link

     * #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be directly

     * in the returned {@link TableDestination}.

     */

    public Write<T> withTimePartitioning(TimePartitioning partitioning) {

      checkArgument(partitioning != null, "partitioning can not be null");

      return withJsonTimePartitioning(

          StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning)));

    }



    /**

     * Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred {@link

     * ValueProvider}.

     */

    public Write<T> withTimePartitioning(ValueProvider<TimePartitioning> partitioning) {

      checkArgument(partitioning != null, "partitioning can not be null");

      return withJsonTimePartitioning(

          NestedValueProvider.of(partitioning, new TimePartitioningToJson()));

    }



    /** The same as {@link #withTimePartitioning}, but takes a JSON-serialized object. */

    public Write<T> withJsonTimePartitioning(ValueProvider<String> partitioning) {

      checkArgument(partitioning != null, "partitioning can not be null");

      return toBuilder().setJsonTimePartitioning(partitioning).build();

    }



    /** Specifies whether the table should be created if it does not exist. */

    public Write<T> withCreateDisposition(CreateDisposition createDisposition) {

      checkArgument(createDisposition != null, "createDisposition can not be null");

      return toBuilder().setCreateDisposition(createDisposition).build();

    }



    /** Specifies what to do with existing data in the table, in case the table already exists. */

    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {

      checkArgument(writeDisposition != null, "writeDisposition can not be null");

      return toBuilder().setWriteDisposition(writeDisposition).build();

    }



    /** Specifies the table description. */

    public Write<T> withTableDescription(String tableDescription) {

      checkArgument(tableDescription != null, "tableDescription can not be null");

      return toBuilder().setTableDescription(tableDescription).build();

    }



    /**

     * Specfies a policy for handling failed inserts.

     *

     * <p>Currently this only is allowed when writing an unbounded collection to BigQuery. Bounded

     * collections are written using batch load jobs, so we don't get per-element failures.

     * Unbounded collections are written using streaming inserts, so we have access to per-element

     * insert results.

     */

    public Write<T> withFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy) {

      checkArgument(retryPolicy != null, "retryPolicy can not be null");

      return toBuilder().setFailedInsertRetryPolicy(retryPolicy).build();

    }



    /** Disables BigQuery table validation. */

    public Write<T> withoutValidation() {

      return toBuilder().setValidate(false).build();

    }



    /**

     * Choose the method used to write data to BigQuery. See the Javadoc on {@link Method} for

     * information and restrictions of the different methods.

     */

    public Write<T> withMethod(Method method) {

      checkArgument(method != null, "method can not be null");

      return toBuilder().setMethod(method).build();

    }



    /**

     * Set the project the BigQuery load job will be initiated from. This is only applicable when

     * the write method is set to {@link Method#FILE_LOADS}. If omitted, the project of the

     * destination table is used.

     */

    public Write<T> withLoadJobProjectId(String loadJobProjectId) {

      return withLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));

    }



    public Write<T> withLoadJobProjectId(ValueProvider<String> loadJobProjectId) {

      checkArgument(loadJobProjectId != null, "loadJobProjectId can not be null");

      return toBuilder().setLoadJobProjectId(loadJobProjectId).build();

    }



    /**

     * Choose the frequency at which file writes are triggered.

     *

     * <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS}, and

     * only when writing an unbounded {@link PCollection}.

     *

     * <p>Every triggeringFrequency duration, a BigQuery load job will be generated for all the data

     * written since the last load job. BigQuery has limits on how many load jobs can be triggered

     * per day, so be careful not to set this duration too low, or you may exceed daily quota. Often

     * this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery

     * quota. See <a href="https://cloud.google.com/bigquery/quota-policy">Quota Policy</a> for more

     * information about BigQuery quotas.

     */

    public Write<T> withTriggeringFrequency(Duration triggeringFrequency) {

      checkArgument(triggeringFrequency != null, "triggeringFrequency can not be null");

      return toBuilder().setTriggeringFrequency(triggeringFrequency).build();

    }



    /**

     * Control how many file shards are written when using BigQuery load jobs. Applicable only when

     * also setting {@link #withTriggeringFrequency}.

     */

    @Experimental

    public Write<T> withNumFileShards(int numFileShards) {

      checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards);

      return toBuilder().setNumFileShards(numFileShards).build();

    }



    /**

     * Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch

     * load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for discussion.

     */

    public Write<T> withCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation) {

      checkArgument(customGcsTempLocation != null, "customGcsTempLocation can not be null");

      return toBuilder().setCustomGcsTempLocation(customGcsTempLocation).build();

    }



    /**

     * Enables extended error information by enabling {@link WriteResult#getFailedInsertsWithErr()}

     *

     * <p>ATM this only works if using {@link Method#STREAMING_INSERTS}. See {@link

     * Write#withMethod(Method)}.

     */

    public Write<T> withExtendedErrorInfo() {

      return toBuilder().setExtendedErrorInfo(true).build();

    }



    /**

     * Insert all valid rows of a request, even if invalid rows exist. This is only applicable when

     * the write method is set to {@link Method#STREAMING_INSERTS}. The default value is false,

     * which causes the entire request to fail if any invalid rows exist.

     */

    public Write<T> skipInvalidRows() {

      return toBuilder().setSkipInvalidRows(true).build();

    }



    /**

     * Accept rows that contain values that do not match the schema. The unknown values are ignored.

     * Default is false, which treats unknown values as errors.

     */

    public Write<T> ignoreUnknownValues() {

      return toBuilder().setIgnoreUnknownValues(true).build();

    }



    public Write<T> withKmsKey(String kmsKey) {

      return toBuilder().setKmsKey(kmsKey).build();

    }



    /**

     * If true, enables new codepaths that are expected to use less resources while writing to

     * BigQuery. Not enabled by default in order to maintain backwards compatibility.

     */

    @Experimental

    public Write<T> optimizedWrites() {

      return toBuilder().setOptimizeWrites(true).build();

    }



    /**

     * If true, then the BigQuery schema will be inferred from the input schema. If no

     * formatFunction is set, then BigQueryIO will automatically turn the input records into

     * TableRows that match the schema.

     */

    @Experimental

    public Write<T> useUnifiedSchema() {

      return toBuilder().setUseUnifiedSchema(true).build();

    }



    @VisibleForTesting

    /** This method is for test usage only */

    public Write<T> withTestServices(BigQueryServices testServices) {

      checkArgument(testServices != null, "testServices can not be null");

      return toBuilder().setBigQueryServices(testServices).build();

    }



    @VisibleForTesting

    Write<T> withMaxFilesPerBundle(int maxFilesPerBundle) {

      checkArgument(

          maxFilesPerBundle > 0, "maxFilesPerBundle must be > 0, but was: %s", maxFilesPerBundle);

      return toBuilder().setMaxFilesPerBundle(maxFilesPerBundle).build();

    }



    @VisibleForTesting

    Write<T> withMaxFileSize(long maxFileSize) {

      checkArgument(maxFileSize > 0, "maxFileSize must be > 0, but was: %s", maxFileSize);

      return toBuilder().setMaxFileSize(maxFileSize).build();

    }



    @VisibleForTesting

    Write<T> withMaxFilesPerPartition(int maxFilesPerPartition) {

      checkArgument(

          maxFilesPerPartition > 0,

          "maxFilesPerPartition must be > 0, but was: %s",

          maxFilesPerPartition);

      return toBuilder().setMaxFilesPerPartition(maxFilesPerPartition).build();

    }



    @VisibleForTesting

    Write<T> withMaxBytesPerPartition(long maxBytesPerPartition) {

      checkArgument(

          maxBytesPerPartition > 0,

          "maxFilesPerPartition must be > 0, but was: %s",

          maxBytesPerPartition);

      return toBuilder().setMaxBytesPerPartition(maxBytesPerPartition).build();

    }



    @Override

    public void validate(PipelineOptions pipelineOptions) {

      BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);



      // The user specified a table.

      if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {

        TableReference table = getTableWithDefaultProject(options).get();

        DatasetService datasetService = getBigQueryServices().getDatasetService(options);

        // Check for destination table presence and emptiness for early failure notification.

        // Note that a presence check can fail when the table or dataset is created by an earlier

        // stage of the pipeline. For these cases the #withoutValidation method can be used to

        // disable the check.

        BigQueryHelpers.verifyDatasetPresence(datasetService, table);

        if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {

          BigQueryHelpers.verifyTablePresence(datasetService, table);

        }

        if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {

          BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);

        }

      }

    }



    private Method resolveMethod(PCollection<T> input) {

      if (getMethod() != Method.DEFAULT) {

        return getMethod();

      }

      // By default, when writing an Unbounded PCollection, we use StreamingInserts and

      // BigQuery's streaming import API.

      return (input.isBounded() == IsBounded.UNBOUNDED)

          ? Method.STREAMING_INSERTS

          : Method.FILE_LOADS;

    }



    @Override

    public WriteResult expand(PCollection<T> input) {

      // We must have a destination to write to!

      checkArgument(

          getTableFunction() != null

              || getJsonTableRef() != null

              || getDynamicDestinations() != null,

          "must set the table reference of a BigQueryIO.Write transform");



      List<?> allToArgs =

          Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());

      checkArgument(

          1

              == Iterables.size(

                  allToArgs.stream()

                      .filter(Predicates.notNull()::apply)

                      .collect(Collectors.toList())),

          "Exactly one of jsonTableRef, tableFunction, or " + "dynamicDestinations must be set");



      List<?> allSchemaArgs =

          Lists.newArrayList(getJsonSchema(), getSchemaFromView(), getDynamicDestinations());

      checkArgument(

          2

              > Iterables.size(

                  allSchemaArgs.stream()

                      .filter(Predicates.notNull()::apply)

                      .collect(Collectors.toList())),

          "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + "be set");



      Method method = resolveMethod(input);

      if (input.isBounded() == IsBounded.UNBOUNDED && method == Method.FILE_LOADS) {

        checkArgument(

            getTriggeringFrequency() != null,

            "When writing an unbounded PCollection via FILE_LOADS, "

                + "triggering frequency must be specified");

      } else {

        checkArgument(

            getTriggeringFrequency() == null && getNumFileShards() == 0,

            "Triggering frequency or number of file shards can be specified only when writing "

                + "an unbounded PCollection via FILE_LOADS, but: the collection was %s "

                + "and the method was %s",

            input.isBounded(),

            method);

      }

      if (getJsonTimePartitioning() != null) {

        checkArgument(

            getDynamicDestinations() == null,

            "The supplied DynamicDestinations object can directly set TimePartitioning."

                + " There is no need to call BigQueryIO.Write.withTimePartitioning.");

        checkArgument(

            getTableFunction() == null,

            "The supplied getTableFunction object can directly set TimePartitioning."

                + " There is no need to call BigQueryIO.Write.withTimePartitioning.");

      }



      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();

      if (dynamicDestinations == null) {

        if (getJsonTableRef() != null) {

          dynamicDestinations =

              DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(

                  getJsonTableRef(), getTableDescription());

        } else if (getTableFunction() != null) {

          dynamicDestinations = new TableFunctionDestinations<>(getTableFunction());

        }



        // Wrap with a DynamicDestinations class that will provide a schema. There might be no

        // schema provided if the create disposition is CREATE_NEVER.

        if (getJsonSchema() != null) {

          dynamicDestinations =

              new ConstantSchemaDestinations<>(

                  (DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema());

        } else if (getSchemaFromView() != null) {

          dynamicDestinations =

              new SchemaFromViewDestinations<>(

                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,

                  getSchemaFromView());

        }



        // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.

        if (getJsonTimePartitioning() != null) {

          dynamicDestinations =

              new ConstantTimePartitioningDestinations<>(

                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,

                  getJsonTimePartitioning());

        }

      }

      return expandTyped(input, dynamicDestinations);

    }



    private <DestinationT> WriteResult expandTyped(

		PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {

      boolean optimizeWrites = getOptimizeWrites();

      SerializableFunction<T, TableRow> formatFunction = getFormatFunction();

      if (getUseUnifiedSchema()) {

        checkArgument(input.hasSchema());

        optimizeWrites = true;

        if (formatFunction == null) {

          // If no format function set, then we will automatically convert the input type to a

          // TableRow.

          formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());

        }

        // Infer the TableSchema from the input Unified schema.

        TableSchema tableSchema = BigQueryUtils.toTableSchema(input.getSchema());

        dynamicDestinations =

            new ConstantSchemaDestinations<>(

                dynamicDestinations,

                StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));

      } else {

        // Require a schema if creating one or more tables.

        checkArgument(

            getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED

                || getJsonSchema() != null

                || getDynamicDestinations() != null

                || getSchemaFromView() != null,

            "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");

      }



      checkArgument(

          formatFunction != null,

          "A function must be provided to convert type into a TableRow. "

              + "use BigQueryIO.Write.withFormatFunction to provide a formatting function."

              + "A format function is not required if Unified schemas are used.");



      Coder<DestinationT> destinationCoder = null;

      try {

        destinationCoder =

            dynamicDestinations.getDestinationCoderWithDefault(

                input.getPipeline().getCoderRegistry());

      } catch (CannotProvideCoderException e) {

        throw new RuntimeException(e);

      }



      Method method = resolveMethod(input);

      if (optimizeWrites) {

        PCollection<KV<DestinationT, T>> rowsWithDestination =

            input

                .apply(

                    "PrepareWrite",

                    new PrepareWrite<>(dynamicDestinations, SerializableFunctions.identity()))

                .setCoder(KvCoder.of(destinationCoder, input.getCoder()));

        return continueExpandTyped(

            rowsWithDestination,

            input.getCoder(),

            destinationCoder,

            dynamicDestinations,

            formatFunction,

            method);

      } else {

        PCollection<KV<DestinationT, TableRow>> rowsWithDestination =

            input

                .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, formatFunction))

                .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));

        return continueExpandTyped(

            rowsWithDestination,

            TableRowJsonCoder.of(),

            destinationCoder,

            dynamicDestinations,

            SerializableFunctions.identity(),

            method);

      }

    }



    private <DestinationT, ElementT> WriteResult continueExpandTyped(

        PCollection<KV<DestinationT, ElementT>> input,

        Coder<ElementT> elementCoder,

        Coder<DestinationT> destinationCoder,

        DynamicDestinations<T, DestinationT> dynamicDestinations,

        SerializableFunction<ElementT, TableRow> toRowFunction,

        Method method) {

      if (method == Method.STREAMING_INSERTS) {

        checkArgument(

            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,

            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded" + " PCollection.");

        InsertRetryPolicy retryPolicy =

            MoreObjects.firstNonNull(getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry());



        StreamingInserts<DestinationT, ElementT> streamingInserts =

            new StreamingInserts<>(

                    getCreateDisposition(), dynamicDestinations, elementCoder, toRowFunction)

                .withInsertRetryPolicy(retryPolicy)

                .withTestServices(getBigQueryServices())

                .withExtendedErrorInfo(getExtendedErrorInfo())

                .withSkipInvalidRows(getSkipInvalidRows())

                .withIgnoreUnknownValues(getIgnoreUnknownValues())

                .withKmsKey(getKmsKey());

        return input.apply(streamingInserts);

      } else {

        checkArgument(

            getFailedInsertRetryPolicy() == null,

            "Record-insert retry policies are not supported when using BigQuery load jobs.");



        BatchLoads<DestinationT, ElementT> batchLoads =

            new BatchLoads<>(

                getWriteDisposition(),

                getCreateDisposition(),

                getJsonTableRef() != null,

                dynamicDestinations,

                destinationCoder,

                getCustomGcsTempLocation(),

                getLoadJobProjectId(),

                getIgnoreUnknownValues(),

                elementCoder,

                toRowFunction,

                getKmsKey());

        batchLoads.setTestServices(getBigQueryServices());

        if (getMaxFilesPerBundle() != null) {

          batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());

        }

        if (getMaxFileSize() != null) {

          batchLoads.setMaxFileSize(getMaxFileSize());

        }

        batchLoads.setMaxFilesPerPartition(getMaxFilesPerPartition());

        batchLoads.setMaxBytesPerPartition(getMaxBytesPerPartition());



        // When running in streaming (unbounded mode) we want to retry failed load jobs

        // indefinitely. Failing the bundle is expensive, so we set a fairly high limit on retries.

        if (IsBounded.UNBOUNDED.equals(input.isBounded())) {

          batchLoads.setMaxRetryJobs(1000);

        }

        batchLoads.setTriggeringFrequency(getTriggeringFrequency());

        batchLoads.setNumFileShards(getNumFileShards());

        return input.apply(batchLoads);

      }

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);



      builder.addIfNotNull(

          DisplayData.item("table", getJsonTableRef()).withLabel("Table Reference"));

      if (getJsonSchema() != null) {

        builder.addIfNotNull(DisplayData.item("schema", getJsonSchema()).withLabel("Table Schema"));

      } else {

        builder.add(DisplayData.item("schema", "Custom Schema Function").withLabel("Table Schema"));

      }



      if (getTableFunction() != null) {

        builder.add(

            DisplayData.item("tableFn", getTableFunction().getClass())

                .withLabel("Table Reference Function"));

      }



      builder

          .add(

              DisplayData.item("createDisposition", getCreateDisposition().toString())

                  .withLabel("Table CreateDisposition"))

          .add(

              DisplayData.item("writeDisposition", getWriteDisposition().toString())

                  .withLabel("Table WriteDisposition"))

          .addIfNotDefault(

              DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true)

          .addIfNotNull(

              DisplayData.item("tableDescription", getTableDescription())

                  .withLabel("Table Description"));

    }



    /**

     * Returns the table to write, or {@code null} if writing with {@code tableFunction}.

     *

     * <p>If the table's project is not specified, use the executing project.

     */

    @Nullable

	ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {

      ValueProvider<TableReference> table = getTable();

      if (table == null) {

        return table;

      }



      if (!table.isAccessible()) {

        LOG.info(

            "Using a dynamic value for table input. This must contain a project"

                + " in the table reference: {}",

            table);

        return table;

      }

      if (Strings.isNullOrEmpty(table.get().getProjectId())) {

        // If user does not specify a project we assume the table to be located in

        // the default project.

        TableReference tableRef = table.get();

        tableRef.setProjectId(bqOptions.getProject());

        return NestedValueProvider.of(

            StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),

            new JsonTableRefToTableRef());

      }

      return table;

    }



    /** Returns the table reference, or {@code null}. */

    @Nullable

    public ValueProvider<TableReference> getTable() {

      return getJsonTableRef() == null

          ? null

          : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());

    }

  }



  /** Clear the cached map of created tables. Used for testing. */

  @VisibleForTesting

  static void clearCreatedTables() {

    CreateTables.clearCreatedTables();

  }



  /////////////////////////////////////////////////////////////////////////////



  /** Disallow construction of utility class. */

  private BigQueryIO() {}

}